{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Generator\n",
    "\n",
    "- Author: [Junseong Kim](https://www.linkedin.com/in/%EC%A4%80%EC%84%B1-%EA%B9%80-591b351b2/)\n",
    "- Design: [Junseong Kim](https://www.linkedin.com/in/%EC%A4%80%EC%84%B1-%EA%B9%80-591b351b2/)\n",
    "- Peer Review: \n",
    "- Proofread : [Chaeyoon Kim](https://github.com/chaeyoonyunakim)\n",
    "- This is a part of [LangChain Open Tutorial](https://github.com/LangChain-OpenTutorial/LangChain-OpenTutorial)\n",
    "\n",
    "[![Open in Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/LangChain-OpenTutorial/LangChain-OpenTutorial/blob/main/13-LangChain-Expression-Language/09-Generator.ipynb)[![Open in GitHub](https://img.shields.io/badge/Open%20in%20GitHub-181717?style=flat-square&logo=github&logoColor=white)](https://github.com/LangChain-OpenTutorial/LangChain-OpenTutorial/blob/main/13-LangChain-Expression-Language/09-Generator.ipynb)\n",
    "## Overview\n",
    "\n",
    "This tutorial demonstrates how to use a **user-defined generator** (or asynchronous generator) within a LangChain pipeline to process text outputs in a streaming manner. Specifically, we’ll show how to parse a comma-separated string output into a Python list, leveraging the benefits of streaming from a language model. We will also cover asynchronous usage, showing how to adopt the same approach with async generators.\n",
    "\n",
    "By the end of this tutorial, you’ll be able to:\n",
    "- Implement a custom generator function that can handle streaming outputs.\n",
    "- Parse comma-separated text chunks into a list in real time.\n",
    "- Use both synchronous and asynchronous approaches for streaming data.\n",
    "- Integrate these parsers into a LangChain chain.\n",
    "- Optionally, explore how ```RunnableGenerator``` can be used to implement custom generator transformations within a streaming context\n",
    "\n",
    "### Table of Contents\n",
    "\n",
    "- [Overview](#overview)  \n",
    "- [Environment Setup](#environment-setup)  \n",
    "- [Implementing a Comma-Separated List Parser with a Custom Generator](#implementing-a-comma-separated-list-parser-with-a-custom-generator)  \n",
    "  - [Synchronous Parsing](#synchronous-parsing)  \n",
    "  - [Asynchronous Parsing](#asynchronous-parsing)  \n",
    "- [Using RunnableGenerator with Our Comma-Separated List Parser](#using-runnablegenerator-with-our-comma-separated-list-parser)  \n",
    "\n",
    "### References\n",
    "\n",
    "- [LangChain ChatOpenAI API reference](https://python.langchain.com/api_reference/openai/chat_models/langchain_openai.chat_models.base.ChatOpenAI.html)\n",
    "- [LangChain custom functions](https://python.langchain.com/docs/how_to/functions/)\n",
    "- [LangChain RunnableGenerator](https://python.langchain.com/api_reference/core/runnables/langchain_core.runnables.base.RunnableGenerator.html)\n",
    "- [Python Generators Documentation](https://docs.python.org/3/tutorial/classes.html#generators)\n",
    "- [Python Async IO Documentation](https://docs.python.org/3/library/asyncio.html)\n",
    "---"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Environment Setup\n",
    "\n",
    "Setting up your environment is the first step. See the [Environment Setup](https://wikidocs.net/257836) guide for more details.\n",
    "\n",
    "**[Note]**\n",
    "- The ```langchain-opentutorial``` is a package of easy-to-use environment setup guidance, useful functions and utilities for tutorials.\n",
    "- Check out the [```langchain-opentutorial```](https://github.com/LangChain-OpenTutorial/langchain-opentutorial-pypi) for more details."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%capture --no-stderr\n",
    "%pip install langchain-opentutorial"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Install required packages\n",
    "from langchain_opentutorial import package\n",
    "\n",
    "package.install(\n",
    "    [\n",
    "        \"langsmith\",\n",
    "        \"langchain\",\n",
    "        \"langchain_openai\",\n",
    "        \"langchain_core\",\n",
    "        \"langchain_community\",\n",
    "    ],\n",
    "    verbose=False,\n",
    "    upgrade=False,\n",
    ")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Environment variables have been set successfully.\n"
     ]
    }
   ],
   "source": [
    "# Set environment variables\n",
    "from langchain_opentutorial import set_env\n",
    "\n",
    "set_env(\n",
    "    {\n",
    "        \"OPENAI_API_KEY\": \"\",\n",
    "        \"LANGCHAIN_API_KEY\": \"\",\n",
    "        \"LANGCHAIN_TRACING_V2\": \"true\",\n",
    "        \"LANGCHAIN_ENDPOINT\": \"https://api.smith.langchain.com\",\n",
    "        \"LANGCHAIN_PROJECT\": \"09-Generator\",\n",
    "    }\n",
    ")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Alternatively, you can set and load ```OPENAI_API_KEY``` from a ```.env``` file.\n",
    "\n",
    "**[Note]** This is only necessary if you haven't already set ```OPENAI_API_KEY``` in previous steps."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "True"
      ]
     },
     "execution_count": 4,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "from dotenv import load_dotenv\n",
    "\n",
    "load_dotenv(override=True)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Implementing a Comma-Separated List Parser with a Custom Generator\n",
    "\n",
    "When working with language models, you might receive outputs as plain text, such as comma-separated strings. To parse these into a structured format (e.g., a list) as they are generated, you can implement a custom generator function. This retains the streaming benefits — observing partial outputs in real time — while transforming the data into a more usable format."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Synchronous Parsing\n",
    "\n",
    "In this section, we define a custom generator function called ```split_into_list()```. For each incoming chunk of tokens (strings), it builds up a string by aggregating characters until a comma is encountered within that chunk. At each comma, it yields the current text (stripped and split) as a list item."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [],
   "source": [
    "from typing import Iterator, List\n",
    "\n",
    "\n",
    "# A user-defined parser that splits a stream of tokens into a comma-separated list\n",
    "def split_into_list(input: Iterator[str]) -> Iterator[List[str]]:\n",
    "    buffer = \"\"\n",
    "    for chunk in input:\n",
    "        # Accumulate tokens in the buffer\n",
    "        buffer += chunk\n",
    "        # Whenever we find a comma, split and yield the segment\n",
    "        while \",\" in buffer:\n",
    "            comma_index = buffer.index(\",\")\n",
    "            yield [buffer[:comma_index].strip()]\n",
    "            buffer = buffer[comma_index + 1 :]\n",
    "    # Finally, yield whatever remains in the buffer\n",
    "    yield [buffer.strip()]"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "We then construct a LangChain pipeline that:\n",
    "\n",
    "- Defines a prompt template for comma-separated outputs.\n",
    "- Uses ```ChatOpenAI``` with ```temperature=0.0``` for deterministic responses. \n",
    "- Converts the raw output to a string using ```StrOutputParser```.\n",
    "- Pipes ( **|** ) the string output into ```split_into_list()``` for parsing."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [],
   "source": [
    "from langchain_core.prompts import ChatPromptTemplate\n",
    "from langchain_core.output_parsers import StrOutputParser\n",
    "from langchain_openai import ChatOpenAI\n",
    "\n",
    "prompt = ChatPromptTemplate.from_template(\n",
    "    \"Write a comma-separated list of 5 companies similar to: {company}\"\n",
    ")\n",
    "\n",
    "# Initialize the model with temperature=0.0 for deterministic output\n",
    "model = ChatOpenAI(temperature=0.0, model=\"gpt-4o\")\n",
    "\n",
    "# Chain 1: Convert to a string\n",
    "str_chain = prompt | model | StrOutputParser()\n",
    "\n",
    "# Chain 2: Parse the comma-separated string into a list using our generator\n",
    "list_chain = str_chain | split_into_list"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "By streaming the output through ```list_chain```, you can observe the partial results in real time. Each list item appears as soon as the parser encounters a comma in the stream."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "['Microsoft']\n",
      "['Apple']\n",
      "['Amazon']\n",
      "['Facebook']\n",
      "['IBM']\n"
     ]
    }
   ],
   "source": [
    "# Stream the parsed data\n",
    "for chunk in list_chain.stream({\"company\": \"Google\"}):\n",
    "    print(chunk, flush=True)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "If you need the entire parsed list at once (after the entire generation process is completed), you can use the ```.invoke()``` method instead of streaming."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "['Microsoft', 'Apple', 'Amazon', 'Facebook', 'IBM']\n"
     ]
    }
   ],
   "source": [
    "output = list_chain.invoke({\"company\": \"Google\"})\n",
    "print(output)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Asynchronous Parsing\n",
    "\n",
    "The method described above works for synchronous iteration. However, some applications may require **asynchronous** operations to prevent blocking the main thread. The following section shows how to achieve the same comma-separated parsing using an **async generator**.\n",
    "\n",
    "\n",
    "The ```asplit_into_list()``` works similarly to its synchronous counterpart, aggregating tokens until a comma is encountered. However, it uses the ```async for``` construct to handle asynchronous data streams."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {},
   "outputs": [],
   "source": [
    "from typing import AsyncIterator\n",
    "\n",
    "\n",
    "async def asplit_into_list(input: AsyncIterator[str]) -> AsyncIterator[List[str]]:\n",
    "    buffer = \"\"\n",
    "    async for chunk in input:\n",
    "        buffer += chunk\n",
    "        while \",\" in buffer:\n",
    "            comma_index = buffer.index(\",\")\n",
    "            yield [buffer[:comma_index].strip()]\n",
    "            buffer = buffer[comma_index + 1 :]\n",
    "    yield [buffer.strip()]"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Then, you can **pipe** the asynchronous parser into a chain like the synchronous version."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {},
   "outputs": [],
   "source": [
    "alist_chain = str_chain | asplit_into_list"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "When you call ```astream()```, you can process each incoming data chunk as it becomes available within an asynchronous context."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "['Microsoft']\n",
      "['Apple']\n",
      "['Amazon']\n",
      "['Facebook']\n",
      "['IBM']\n"
     ]
    }
   ],
   "source": [
    "async for chunk in alist_chain.astream({\"company\": \"Google\"}):\n",
    "    print(chunk, flush=True)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Similarly, you can get the entire parsed list, using the asynchronous ```ainvoke()``` method."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "['Microsoft', 'Apple', 'Amazon', 'Facebook', 'IBM']\n"
     ]
    }
   ],
   "source": [
    "result = await alist_chain.ainvoke({\"company\": \"Google\"})\n",
    "print(result)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Using RunnableGenerator with Our Comma-Separated List Parser\n",
    "\n",
    "In addition to implementing your own generator functions directly, LangChain offers the ```RunnableGenerator``` class for more advanced or modular streaming behavior. This approach wraps your generator logic in a Runnable, easily pluggin it into a chain while preserving partial output streaming. Below, we modify our **comma-separated list parser** to demonstrate how ```RunnableGenerator``` can be applied.\n",
    "\n",
    "### Advantages of RunnableGenerator\n",
    "- Modularity: Easily encapsulate your parsing logic as a Runnable component.\n",
    "- Consistency: The ```RunnableGenerator``` interface (```invoke```, ```stream```, ```ainvoke```, ```astream```) is consistent with other LangChain Runnables.\n",
    "- Extendability: Combine multiple Runnables (e.g., ```RunnableLambda```, ```RunnableGenerator```) in sequence for more complex transformations.  "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Transforming the Same Parser Logic\n",
    "\n",
    "Previously, we defined ```split_into_list()``` as a standalone Python generator function. Now, let’s create an equivalent **transform** function, specifically designed for use with ```RunnableGenerator```. Our goal remains the same: we want to parse a streaming sequence of tokens into a **list** of individual items upon encountering a comma."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 13,
   "metadata": {},
   "outputs": [],
   "source": [
    "from langchain_core.runnables import RunnableGenerator\n",
    "from typing import Iterator, List\n",
    "\n",
    "\n",
    "def comma_parser_runnable(input_iter: Iterator[str]) -> Iterator[List[str]]:\n",
    "    \"\"\"\n",
    "    This function accumulates tokens from input_iter and yields\n",
    "    each chunk split by commas as a list.\n",
    "    \"\"\"\n",
    "    buffer = \"\"\n",
    "    for chunk in input_iter:\n",
    "        buffer += chunk\n",
    "        # Whenever we find a comma, split and yield\n",
    "        while \",\" in buffer:\n",
    "            comma_index = buffer.index(\",\")\n",
    "            yield [buffer[:comma_index].strip()]\n",
    "            buffer = buffer[comma_index + 1 :]\n",
    "    # Finally, yield whatever remains\n",
    "    yield [buffer.strip()]\n",
    "\n",
    "\n",
    "# Wrap it in a RunnableGenerator\n",
    "parser_runnable = RunnableGenerator(comma_parser_runnable)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "We can now integrate ```parser_runnable``` into the **same** prompt-and-model pipeline we used before."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 14,
   "metadata": {},
   "outputs": [],
   "source": [
    "list_chain_via_runnable = str_chain | parser_runnable"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "When run, partial outputs will appear as single-element lists, like our original custom generator approach. \n",
    "\n",
    "The difference is that we’re now using ```RunnableGenerator``` to encapsulate the logic in a more modular, LangChain-native way."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 15,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "['Microsoft']\n",
      "['Apple']\n",
      "['Amazon']\n",
      "['Facebook']\n",
      "['IBM']\n"
     ]
    }
   ],
   "source": [
    "# Stream partial results\n",
    "for parsed_chunk in list_chain_via_runnable.stream({\"company\": \"Google\"}):\n",
    "    print(parsed_chunk)"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "langchain-opentutorial-QDzDRI-1-py3.11",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.11.0"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
